Skip to content

Upgrade Kafka to 4.1.1#2342

Open
egyedt wants to merge 3 commits intolinkedin:mainfrom
egyedt:bump_kafka-4.1.1
Open

Upgrade Kafka to 4.1.1#2342
egyedt wants to merge 3 commits intolinkedin:mainfrom
egyedt:bump_kafka-4.1.1

Conversation

@egyedt
Copy link
Copy Markdown
Contributor

@egyedt egyedt commented Dec 9, 2025

Summary

Bump Kafka version to 4.1.1.

Categorization

  • documentation
  • bugfix
  • new feature
  • refactor
  • security/CVE
  • other

This PR resolves #2341 if any.

@egyedt egyedt force-pushed the bump_kafka-4.1.1 branch 3 times, most recently from 6a07fb1 to 1d2fd41 Compare December 11, 2025 15:43
egyedt added a commit to egyedt/cruise-control that referenced this pull request Dec 12, 2025
Change-Id: I689c3002a417f3e3bd319f714df3bf6a1994b8c0
Comment thread gradle.properties
org.gradle.jvmargs=-Xms512m -Xmx512m
scalaVersion=2.13.13
kafkaVersion=4.0.0
kafkaVersion=4.1.1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks!

Change-Id: I605570ae67238e2a9270531628625045cdbbccdd
readLogDirs(config);
_id = Integer.parseInt((String) config.get(KRaftConfigs.NODE_ID_CONFIG));
_metadataLogDir = new File((String) config.get(KRaftConfigs.METADATA_LOG_DIR_CONFIG));
_metadataLogDir = new File((String) config.get(MetadataLogConfig.METADATA_LOG_DIR_CONFIG));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In effort to avoid dependence on internal/private Kafka APIs like MetadataLogConfig [1] could we create a field like the following:

public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";

in the KafkaServerConfigs class [2] instead?

[1] #2282
[2] https://github.com/linkedin/cruise-control/blob/main/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/KafkaServerConfigs.java

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kyguy, do you think there's some way we can clearly instruct people to avoid adding dependencies on the internal Kafka APIs? Because doing so would start nullifying your previous work to decouple CC from them.

Maybe a README heads up, maybe a contribution guide, maybe a PR template hint...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a README heads up, maybe a contribution guide, maybe a PR template hint...

Those all sound like good ideas to me. It'll definitely be a lot easier after we have finished the migration off of internal Kafka APIs and have the related dependencies removed from the gradle file. Then we could have some sort of check in the PR template to make sure no internal Kafka dependencies have been added to the gradle file as part of the PR.

props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString());
props.put(ServerLogConfigs.LOG_DIR_CONFIG, _logDirectory.getAbsolutePath());
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath());
props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

private void initializeMetaData(KafkaConfig config) {
Set<File> allLogDirs = new HashSet<>(readLogDirs(config));
allLogDirs.add(new File(config.getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG)));
allLogDirs.add(new File(config.getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack


@Test
public void testUpdatingMetricsTopicConfig() throws InterruptedException, TimeoutException {
@Ignore("Container stop and start will destroy Kafka broker and create a new one so this test is not valid in this form.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we are recreating a broker here, isn't the test still valid? From what I understand the cruise control metric reporter agent of the recreated broker will update the metrics topic config won't it?

[1]

protected void maybeUpdateTopicConfig() {
try {
// Retrieve topic config to check and update.
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, _cruiseControlMetricsTopic);
DescribeConfigsResult describeConfigsResult = _adminClient.describeConfigs(Collections.singleton(topicResource));
Config topicConfig = describeConfigsResult.values().get(topicResource).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Set<AlterConfigOp> alterConfigOps = new HashSet<>();
Map<String, String> configsToSet = new HashMap<>();
configsToSet.put(TopicConfig.RETENTION_MS_CONFIG, _metricsTopic.configs().get(TopicConfig.RETENTION_MS_CONFIG));
configsToSet.put(TopicConfig.CLEANUP_POLICY_CONFIG, _metricsTopic.configs().get(TopicConfig.CLEANUP_POLICY_CONFIG));
maybeUpdateConfig(alterConfigOps, configsToSet, topicConfig);
if (!alterConfigOps.isEmpty()) {
AlterConfigsResult alterConfigsResult = _adminClient.incrementalAlterConfigs(Collections.singletonMap(topicResource, alterConfigOps));
alterConfigsResult.values().get(topicResource).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Unable to update config of Cruise Cruise Control metrics topic {}", _cruiseControlMetricsTopic, e);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think TestContainers are not OK with it. I tested this multiple times and it seems like it is flaky and it can stuck in a bad state (The still available node tries to connect to the other node which is not existing, and the destroyed node cannot be created again due to Kafka errors).
It may need a deeper investigation, but I disabled this test now, since it was very flaky on my PRs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a closer look at this today and was able to reproduce the issue on my side. It appears that this test was never fully migrated for KRaft clusters. Currently, it sets up a cluster with only two brokers, so when the test restarts one of them, the quorum is temporarily lost. As a result, the check for the topic metadata change times out while waiting for the quorum to recover.

From what I understand, Kafka 4.x enforces stricter Raft semantics compared to Kafka 3.9.1, which is why this test used to pass consistently before the upgrade.

To address this issue we can create a cluster with 3 brokers so that the quorum isn't lost when one of the brokers is restarted. We can do this by adding the following to the class:

  private static final int NUM_OF_BROKERS = 3;

  @Override
  protected int clusterSize() {
    return NUM_OF_BROKERS;
  }

With this change the test seems to pass consistently on my side, let me know what you think and/or if it works for you!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for investigating it. I applied this fix in a new commit, since if it was more frequent with Kafka 4+, then it should be fixed in this PR.

import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.UnifiedLog;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment for this as well, is there a way we can avoid using private Kafka APIs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

egyedt added 2 commits January 6, 2026 12:55
Change-Id: Ic258099bb3d55c44696aff7aa985fdb3cc43bec1
Change-Id: I3288df802040d66783d57467ea6b6e2cf4e8ee4e
Copy link
Copy Markdown
Contributor

@kyguy kyguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @egyedt, looks great!

@egyedt
Copy link
Copy Markdown
Contributor Author

egyedt commented Jan 13, 2026

@bgrishinko
This PR has an approval and is available to be merged.
Thanks in advance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bump Kafka version to 4.1.1

4 participants